//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// topn_executor.h
//
// Identification: src/include/execution/executors/topn_executor.h
//
// Copyright (c) 2015-2022, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#pragma once

#include <algorithm>
#include <memory>
#include <queue>
#include <utility>
#include <vector>

#include "execution/executor_context.h"
#include "execution/executors/abstract_executor.h"
#include "execution/plans/seq_scan_plan.h"
#include "execution/plans/topn_plan.h"
#include "storage/table/tuple.h"

namespace bustub {

using OrderBys = std::vector<std::pair<OrderByType, AbstractExpressionRef>>;
struct TupleWrapper {
  TupleWrapper(const TopNPlanNode *plan, Tuple tuple) : plan_(plan), tuple_(std::move(tuple)) {}
  auto operator<(const TupleWrapper &tuplew) const -> bool {
    auto order_bys = plan_->GetOrderBy();
    auto schema = plan_->OutputSchema();
    for (const auto &order_by : order_bys) {
      if (order_by.second->Evaluate(&tuple_, schema)
              .CompareNotEquals(order_by.second->Evaluate(&tuplew.tuple_, schema)) == CmpBool::CmpTrue) {
        if (order_by.first == OrderByType::DESC) {
          return order_by.second->Evaluate(&tuple_, schema)
                     .CompareLessThan(order_by.second->Evaluate(&tuplew.tuple_, schema)) == CmpBool::CmpTrue;
        }
        return order_by.second->Evaluate(&tuple_, schema)
                   .CompareGreaterThan(order_by.second->Evaluate(&tuplew.tuple_, schema)) == CmpBool::CmpTrue;
      }
    }
    return true;
  }
  const TopNPlanNode *plan_;
  Tuple tuple_;
};

class PriorityQueue {
 public:
  explicit PriorityQueue(const TopNPlanNode *plan) : plan_(plan), capacity_(plan->GetN()) {
    array_.resize(capacity_ + 1);
  }

  auto Empty() -> bool { return size_ == 0; }

  auto Insert(const Tuple &tuple) -> void {
    if (size_ < capacity_) {
      size_++;
      array_[size_] = tuple;
      Swim(size_);
      return;
    }
    if (Compare(tuple, array_[1])) {
      return;
    }
    array_[1] = tuple;
    Sink(1);
  }

  auto RemoveTop() -> Tuple {
    Tuple top = array_[1];
    array_[1] = array_[size_];
    size_--;
    Sink(1);
    return top;
  }

 private:
  // TopNExecutor *top_n_executor_;
  const TopNPlanNode *plan_;
  size_t capacity_;
  size_t size_ = 0;  // array_[0] in not used. size_ points to the last item.
  std::vector<Tuple> array_;

  void Swap(int index1, int index2) {
    Tuple temp = array_[index1];
    array_[index1] = array_[index2];
    array_[index2] = temp;
  }

  void Swim(int index) {
    while (index != 1 && Compare(array_[index], array_[index / 2])) {
      Swap(index, index / 2);
      index /= 2;
    }
  }

  void Sink(int index) {
    int min_child;
    if (static_cast<size_t>(index) * 2 + 1 <= size_) {
      min_child = Compare(array_[index * 2], array_[index * 2 + 1]) ? index * 2 : index * 2 + 1;
    } else if (static_cast<size_t>(index) * 2 <= size_) {
      min_child = index * 2;
    } else {
      return;
    }

    while (Compare(array_[min_child], array_[index])) {
      Swap(index, min_child);
      index = min_child;
      if (static_cast<size_t>(index) * 2 + 1 <= size_) {
        min_child = Compare(array_[index * 2], array_[index * 2 + 1]) ? index * 2 : index * 2 + 1;
      } else if (static_cast<size_t>(index) * 2 <= size_) {
        min_child = index * 2;
      } else {
        return;
      }
    }
  }

  /**
   * @return true if we prefer tb rather than ta.
   */
  auto Compare(const Tuple &ta, const Tuple &tb) -> bool { return !CompareHelper(ta, tb); }

  /**
   * @return true if ta should be placed before tb.
   */
  auto CompareHelper(const Tuple &ta, const Tuple &tb) -> bool {
    for (const auto &order_by : plan_->order_bys_) {
      Value va = order_by.second->Evaluate(&ta, plan_->OutputSchema());
      Value vb = order_by.second->Evaluate(&tb, plan_->OutputSchema());
      if (va.CompareNotEquals(vb) == CmpBool::CmpTrue) {
        if (order_by.first == OrderByType::DESC) {
          return va.CompareGreaterThan(vb) == CmpBool::CmpTrue;
        }
        return va.CompareLessThan(vb) == CmpBool::CmpTrue;
      }
    }
    return true;
  };
};

/**
 * The TopNExecutor executor executes a topn.
 */
class TopNExecutor : public AbstractExecutor {
 public:
  /**
   * Construct a new TopNExecutor instance.
   * @param exec_ctx The executor context
   * @param plan The TopN plan to be executed
   */
  TopNExecutor(ExecutorContext *exec_ctx, const TopNPlanNode *plan, std::unique_ptr<AbstractExecutor> &&child_executor);

  /** Initialize the TopN */
  void Init() override;

  /**
   * Yield the next tuple from the TopN.
   * @param[out] tuple The next tuple produced by the TopN
   * @param[out] rid The next tuple RID produced by the TopN
   * @return `true` if a tuple was produced, `false` if there are no more tuples
   */
  auto Next(Tuple *tuple, RID *rid) -> bool override;

  /** @return The output schema for the TopN */
  auto GetOutputSchema() const -> const Schema & override { return plan_->OutputSchema(); }

  /** Sets new child executor (for testing only) */
  void SetChildExecutor(std::unique_ptr<AbstractExecutor> &&child_executor) {
    child_executor_ = std::move(child_executor);
  }

  /** @return The size of top_entries_ container, which will be called on each child_executor->Next(). */
  auto GetNumInHeap() -> size_t;

 private:
  /** The TopN plan node to be executed */
  const TopNPlanNode *plan_;
  /** The child executor from which tuples are obtained */
  std::unique_ptr<AbstractExecutor> child_executor_;
  std::priority_queue<TupleWrapper> res_;
  // bustub::PriorityQueue res_;
  size_t cursor_{0};
};
}  // namespace bustub
